{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sc = SparkContext(conf=SparkConf())\n",
    "spark = SparkSession(sparkContext=sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Example data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+------+---+---+---+---+\n",
      "| x1|    x2| x3| x4| y1| y2|\n",
      "+---+------+---+---+---+---+\n",
      "|  a| apple|  1|2.4|  1|yes|\n",
      "|  a|orange|  1|2.5|  0| no|\n",
      "|  b|orange|  2|3.5|  1| no|\n",
      "|  b|orange|  2|1.4|  0|yes|\n",
      "|  b| peach|  2|2.1|  0|yes|\n",
      "|  c| peach|  4|1.5|  1|yes|\n",
      "+---+------+---+---+---+---+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "import pandas as pd\n",
    "pdf = pd.DataFrame({\n",
    "        'x1': ['a','a','b','b', 'b', 'c'],\n",
    "        'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],\n",
    "        'x3': [1, 1, 2, 2, 2, 4],\n",
    "        'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],\n",
    "        'y1': [1, 0, 1, 0, 0, 1],\n",
    "        'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']\n",
    "    })\n",
    "df = spark.createDataFrame(pdf)\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# StringIndexer"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`StringIndexer` maps a string column to a index column that will be treated as a categorical column by spark. **The indices start with 0 and are ordered by label frequencies**. If it is a numerical column, the column will first be casted to a string column and then indexed by  StringIndexer.\n",
    "\n",
    "There are three steps to implement the StringIndexer\n",
    "\n",
    "1. Build the StringIndexer model: specify the input column and output column names.\n",
    "2. Learn the StringIndexer model: fit the model with your data.\n",
    "3. Execute the indexing: call the transform function to execute the indexing process."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Example: `StringIndex` column \"x1\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+------+---+---+---+---+----------+\n",
      "| x1|    x2| x3| x4| y1| y2|indexed_x1|\n",
      "+---+------+---+---+---+---+----------+\n",
      "|  a| apple|  1|2.4|  1|yes|       1.0|\n",
      "|  a|orange|  1|2.5|  0| no|       1.0|\n",
      "|  b|orange|  2|3.5|  1| no|       0.0|\n",
      "|  b|orange|  2|1.4|  0|yes|       0.0|\n",
      "|  b| peach|  2|2.1|  0|yes|       0.0|\n",
      "|  c| peach|  4|1.5|  1|yes|       2.0|\n",
      "+---+------+---+---+---+---+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import StringIndexer\n",
    "\n",
    "# build indexer\n",
    "string_indexer = StringIndexer(inputCol='x1', outputCol='indexed_x1')\n",
    "\n",
    "# learn the model\n",
    "string_indexer_model = string_indexer.fit(df)\n",
    "\n",
    "# transform the data\n",
    "df_stringindexer = string_indexer_model.transform(df)\n",
    "\n",
    "# resulting df\n",
    "df_stringindexer.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "From the result above, we can see that (a, b, c) in column x1 are converted to (1.0, 0.0, 2.0). They are ordered by their frequencies in column x1.\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# OneHotEncoder"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**`OneHotEncoder`** converts each categories of a **StringIndexed** column to a `sparse vector`. Each sparse vector has **at most one single active elements** that indicate the category index."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+\n",
      "| x1|\n",
      "+---+\n",
      "|  a|\n",
      "|  a|\n",
      "|  b|\n",
      "|  b|\n",
      "|  b|\n",
      "|  c|\n",
      "+---+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_ohe = df.select('x1')\n",
    "df_ohe.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### `StringIndex` column 'x1'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+----------+\n",
      "| x1|indexed_x1|\n",
      "+---+----------+\n",
      "|  a|       1.0|\n",
      "|  a|       1.0|\n",
      "|  b|       0.0|\n",
      "|  b|       0.0|\n",
      "|  b|       0.0|\n",
      "|  c|       2.0|\n",
      "+---+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_x1_indexed = StringIndexer(inputCol='x1', outputCol='indexed_x1').fit(df_ohe).transform(df_ohe)\n",
    "df_x1_indexed.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "'x1' has three categories: 'a', 'b' and 'c',  which corresponding string indices 1.0, 0.0 and 2.0, respectively."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Mapping string indices to sparse vectors"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "* Encoding format: 'string index': ['string indices vector size', 'index of string index in string indices vector', **1.0** ]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here the string indices vector is `[0.0, 1.0, 2.0]`. Therefore, the mapping between string indices and sparse vectors are:\n",
    "* `0.0: [3, [0], [1.0]]`\n",
    "* `1.0: [3, [1], [1.0]]`\n",
    "* `2.0: [3, [2], [1.0]]`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After we convert all sparse vectors to dense vectors, we get:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array([[ 1.,  0.,  0.],\n",
       "       [ 0.,  1.,  0.],\n",
       "       [ 0.,  0.,  1.]])"
      ]
     },
     "execution_count": 43,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.ml.linalg import DenseVector, SparseVector, DenseMatrix, SparseMatrix\n",
    "x = [SparseVector(3, {0: 1.0}).toArray()] + \\\n",
    "    [SparseVector(3, {1: 1.0}).toArray()] + \\\n",
    "    [SparseVector(3, {2: 1.0}).toArray()]\n",
    "\n",
    "import numpy as np\n",
    "np.array(x)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**The obtained matrix is exactly the matrix that we would use to represent our categorical variable in a statistical class**."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### One more step to go\n",
    "\n",
    "`OneHotEncoder` by default will drop the last category. So the **string indices vector** becomes `[0.0, 1.0]`, and the mappings between string indices and sparse vectors are:\n",
    "\n",
    "* `0.0: [2, [0], [1.0]]`\n",
    "* `1.0: [2, [1], [1.0]]`\n",
    "* `2.0: [2, [], []]`\n",
    "\n",
    "We use a sparse vector that has **no active element**(basically all elements are 0's) to represent the last category."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Verify"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### OneHotEncode column 'indexed_x1'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import OneHotEncoder"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+----------+-------------+\n",
      "| x1|indexed_x1|   encoded_x1|\n",
      "+---+----------+-------------+\n",
      "|  a|       1.0|(2,[1],[1.0])|\n",
      "|  a|       1.0|(2,[1],[1.0])|\n",
      "|  b|       0.0|(2,[0],[1.0])|\n",
      "|  b|       0.0|(2,[0],[1.0])|\n",
      "|  b|       0.0|(2,[0],[1.0])|\n",
      "|  c|       2.0|    (2,[],[])|\n",
      "+---+----------+-------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "OneHotEncoder(inputCol='indexed_x1', outputCol='encoded_x1').transform(df_x1_indexed).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Specify to not drop the last category\n",
    "\n",
    "If we choose to not drop the last category, we get the expected results."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+----------+-------------+\n",
      "| x1|indexed_x1|   encoded_x1|\n",
      "+---+----------+-------------+\n",
      "|  a|       1.0|(3,[1],[1.0])|\n",
      "|  a|       1.0|(3,[1],[1.0])|\n",
      "|  b|       0.0|(3,[0],[1.0])|\n",
      "|  b|       0.0|(3,[0],[1.0])|\n",
      "|  b|       0.0|(3,[0],[1.0])|\n",
      "|  c|       2.0|(3,[2],[1.0])|\n",
      "+---+----------+-------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "OneHotEncoder(dropLast=False, inputCol='indexed_x1', outputCol='encoded_x1').transform(df_x1_indexed).show()"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
